累计统计
准备数据
access.csv
1 | A,2015-01,5 |
准备环境
1 | object AccumulatorCount { |
具体实现逻辑
DataFrame API 方式
方式一:
1 | // rowsBetween(Long.MinValue, 0):窗口的大小是按照排序从最小值到当前行 |
方式二
1 | usersDF.select( |
sql方式
思路:根据DF算子意思,找到SqlBase.g4文件,看看是否有该类sql支持。
在SqlBase.g4文件中刚好找到如下内容
1 | windowFrame |
在spark源码sql模块core项目org.apache.spark.sql.execution包中找到SQLWindowFunctionSuite类找到如下测试方法
1 | test("window function: multiple window expressions in a single expression") { |
下面就可以开心的照着案例写sql去了,真嗨皮!!!!
1 | usersDF.createOrReplaceTempView("access") |
累加N天之前,假设N=3
DataFrame API方式
1 | val preThreeAccuCntSpec = Window.partitionBy("name").orderBy("mounth").rowsBetween(-3, 0) |
sql方式
1 | spark.sql( |
累加前3天,后3天
API方式
1 | val preThreeFiveAccuCntSpec = Window.partitionBy("name").orderBy("mounth").rowsBetween(3, 3) |
sql方式
1 | spark.sql( |
基本窗口函数案例
准备环境
1 | object WindowFunctionTest extends BaseSparkSession { |
平均移动值
DataFrame API方式实现
1 | // 窗口定义从 -1(前一行)到 1(后一行) ,每一个滑动的窗口总用有3行 |
sql方式实现
1 | df.createOrReplaceTempView("site_info") |
前一行数据
DataFrame API方式实现
1 | val lagwSpec = Window.partitionBy("site").orderBy("date") |
sql方式实现
1 | df.createOrReplaceTempView("site_info") |
排名
DataFrame API方式实现
1 | val rankwSpec = Window.partitionBy("site").orderBy("date") |
sql方式
1 | spark.sql( |
分组topn和分组取最小
1 | import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} |
优雅方式定义scheme
1 | def getScheme(): StructType = { |
保存小数点后n位
10表示总的位数,2表示保留几位小数,10要>=实际的位数,否则为NULL
1 | spark.sql("select cast(sale_amount as decimal(10, 2))from ycd").show() |
重命名行
1 | import org.apache.spark.sql.SparkSession |
转置
1 | import org.apache.spark.sql.DataFrame |